package com.elastic.search.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.elastic.search.event.CanalEvent;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * @author huangdeyao
 */
public abstract class AbstractCanalListener<EVENT extends CanalEvent> implements ApplicationListener<EVENT> {

    private static final Logger logger = LoggerFactory.getLogger(AbstractCanalListener.class);

    static final String ES_ID = "id";

    private static final String STATUS = "status";


    private static final int ONE = 0;

    @Override
    public void onApplicationEvent(EVENT event) {
        Entry entry = event.getEntry();
        String database = entry.getHeader().getSchemaName();
        String table = entry.getHeader().getTableName();
        RowChange change;
        try {
            change = RowChange.parseFrom(entry.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            logger.error("canalEntry_parser_error,根据CanalEntry获取RowChange失败！", e);
            return;
        }
        change.getRowDatasList().forEach(rowData -> doSync(database, table, database + "_" + table, rowData));
    }

    /**
     * 异步执行数据传递
     *
     * @param database 数据库
     * @param table    表
     * @param index    ES索引
     * @param rowData  更改的数据
     */
    protected abstract void doSync(String database, String table, String index, RowData rowData);

    /**
     * 组装ES map对象
     *
     * @param columns 解析出来的表字段
     * @return
     */
    Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
        Map<String, Object> jsonMap = new HashMap<>(columns.size());
        columns.forEach(column -> {
            if (column == null) {
                return;
            }
            if (column.getName().equals(STATUS) && Integer.valueOf(column.getValue()) == ONE) {
                jsonMap.put(ES_ID, null);
                return;
            }
            jsonMap.put(column.getName(), column.getIsNull() ? null : column.getValue());
        });
        return jsonMap;
    }
}
